Aprendizado Não-Supervisionado - Professor Mateus Mendelson
Integrantes: Nasser Santiago Boan, Emmanuel Moreira, Harlan Martins
O objetivo desse projeto é implementar um algoritmo DBSCAN (Density-based spatial clustering of applications with noise). O DBSCAN é um algoritmo de clusterização de aprendizado não-supervisionado, utilizado para procurar subsets dentro um conjunto de dados que se assemelham em termos de suas caracterÃsticas X. Esse método agrupa pontos que estão juntos dentro de uma área especificada, marcando como outliers aqueles que se encontram em áreas de baixa densidade.
Para que os dados estejam prontos para ser usados devemos, inicialmente, executar o script 'make_dataset.py'. Conforme abaixo.
!python src/make_dataset.py
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
import matplotlib.pyplot as plt
df = pd.read_csv('data/meu_dataset.csv')
df
fig = px.scatter(df, x='cases', y='deaths')
fig.show()
len(df[['cases', 'deaths']].drop_duplicates())
eps = 100
min_samples = 6
class MyDBSCAN:
def __init__(self, eps, min_samples, data=None, drop_duplicates=False):
self.eps = eps
self.min_samples = min_samples
self.points = data
self.point_status = dict()
self.visited = set()
self.cores = dict()
self.clustered_cores = dict()
self.scan = dict()
self.drop_duplicates = drop_duplicates
def dbscan(self):
self.cores = self.surround_cores()
self.clustered_cores = self.link_cores()
self.scan = self.grow_clusters()
def in_circle(self, point1, point2, eps):
if (point2[0] - point1[0])**2 + (point2[1] - point1[1])**2 <= eps**2:
return True
def drop_d(self):
print(len(self.points))
new_array = [tuple(row) for row in self.points]
self.points = np.unique(new_array, axis=0)
print(type(self.points))
print(len(self.points))
def surround_cores(self):
if self.drop_duplicates:
self.drop_d()
cores = dict()
for point in self.points:
point = tuple(point.tolist())
if point in self.point_status:
if self.point_status[point] == 'core':
continue
points_within_area = set()
for pt in self.points:
pt = tuple(pt.tolist())
# Check if point belongs to circle within eps radius
if self.in_circle(point, pt, self.eps):
points_within_area.add(pt)
subcluster = len(points_within_area)
if subcluster >= self.min_samples:
self.point_status[point] = 'core'
for pt in points_within_area:
if pt not in self.point_status or self.point_status[pt] == 'outlier':
self.point_status[pt] = 'border'
cores[point] = points_within_area
else:
if point in self.point_status:
if self.point_status[point] == 'border':
continue
self.point_status[point] = 'outlier'
return cores
def link_cores(self):
n_cluster = 1
cores_list = list(self.cores.keys())
cores_list.sort()
clusters = dict()
linked_cores = set()
for i in range(len(cores_list) - 1):
linked_cores.add(cores_list[i])
if self.in_circle(cores_list[i], cores_list[i+1], self.eps):
linked_cores.update([cores_list[i], cores_list[i+1]])
if i == len(cores_list) - 2:
clusters[str(n_cluster)] = linked_cores.copy()
return clusters
else:
clusters[str(n_cluster)] = linked_cores.copy()
linked_cores.clear()
n_cluster += 1
if i == len(cores_list) - 2:
clusters[str(n_cluster)] = linked_cores.copy()
return clusters
def grow_clusters(self):
n_cluster = 1
result = dict()
for cores_list in self.clustered_cores.values():
borders = set()
for core in cores_list:
borders.update(self.cores[core])
result[str(n_cluster)] = {'cores': cores_list, 'borders': borders.difference(cores_list)}
n_cluster += 1
return result
def show_plot(self):
data_framed = []
for n, cluster in self.scan.items():
for kind, dots in cluster.items():
for dot in dots:
dot = list(dot)
dot.append(n)
dot.append(kind)
data_framed.append(dot)
outliers = [[k[0], k[1], '0', 'outlier'] for [k, v] in self.point_status.items() if v == 'outlier' and k not in self.cores.keys()]
data_framed = data_framed + outliers
df_plot = pd.DataFrame(data_framed, columns=['cases', 'deaths', 'cluster', 'kind'])
fig = px.scatter(df_plot, x = 'cases', y = 'deaths', color = 'cluster', color_discrete_sequence=px.colors.qualitative.Plotly[:len(self.scan.items())],title='DBSCAN')
fig.show()
def show_cores_plot(self):
data_framed = []
for core, borders in self.cores.items():
core = list(core)
core.append(self.eps*2)
data_framed.append(core)
for border in borders:
border = list(border)
border.append(3)
data_framed.append(border)
outliers = [[k[0], k[1], 1] for [k, v] in self.point_status.items() if v == 'outlier' and k not in self.cores.keys()]
data_framed = data_framed + outliers
df_plot = pd.DataFrame(data_framed, columns=['cases', 'deaths', 'kind'])
fig = px.scatter(df_plot, x = 'cases', y = 'deaths', size = 'kind', color = 'kind', color_discrete_sequence=px.colors.qualitative.Plotly[:len(self.scan.items())],title='CORES')
fig.show()
sol = MyDBSCAN(eps=eps, min_samples=min_samples, data=df[['cases', 'deaths']].values, drop_duplicates=False)
sol.dbscan()
sol.show_cores_plot()
sol.show_plot()
from sklearn.cluster import DBSCAN
from sklearn import metrics
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
# #############################################################################
# Generate sample data
X = df[['cases', 'deaths']].values
X = StandardScaler().fit_transform(X)
# #############################################################################
# Compute DBSCAN
db = DBSCAN(eps=0.3, min_samples=10).fit(X)
core_samples_mask = np.zeros_like(db.labels_, dtype=bool)
core_samples_mask[db.core_sample_indices_] = True
labels = db.labels_
# Number of clusters in labels, ignoring noise if present.
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
n_noise_ = list(labels).count(-1)
print('Estimated number of clusters: %d' % n_clusters_)
print('Estimated number of noise points: %d' % n_noise_)
#print("Silhouette Coefficient: %0.3f"
# % metrics.silhouette_score(X, labels))
# #############################################################################
# Black removed and is used for noise instead.
unique_labels = set(labels)
colors = [plt.cm.Spectral(each)
for each in np.linspace(0, 1, len(unique_labels))]
for k, col in zip(unique_labels, colors):
if k == -1:
# Black used for noise.
col = [0, 0, 0, 1]
class_member_mask = (labels == k)
xy = X[class_member_mask & core_samples_mask]
plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
markeredgecolor='k', markersize=14)
xy = X[class_member_mask & ~core_samples_mask]
plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
markeredgecolor='k', markersize=6)
fig = plt.gcf()
plt.title('Estimated number of clusters: %d' % n_clusters_)
fig.set_size_inches(20,10)
plt.show()